/*******************************************************************************
 * Package: com.song.bigdata.stream.table
 * Type:    Dome
 * Date:    2022-10-30 21:25
 *
 * Copyright (c) 2022 HUANENG GUICHENG TRUST CORP.,LTD All Rights Reserved.
 *
 * You may not use this file except in compliance with the License.
 *******************************************************************************/
package com.song.bigdata.stream.table;

import com.song.bigdata.stream.pojo.User;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * 功能描述： table Api 将流式数据 使用sql语言来实现
 * 面向过程编程吧
 *
 * 输出：
 * +I[song-user, 12]
 * +I[xian-user, 92]
 * +I[song-user, 11]
 * +I[song-user, 110]
 * +I[song-user, 90]
 * +I[xian-user, 10]
 * @author Songxianyang
 * @date 2022-10-30 21:25
 */
public class Dome {
    public static void main(String[] args) throws Exception {
        // 获取环境// 创建环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        environment.setParallelism(1);
        // 设置数据源
        DataStreamSource<User> streamSource = environment.fromElements(
                new User("song-user", 12),
                new User("xian-user", 92),
                new User("song-user", 11),
                new User("song-user", 110),
                new User("song-user", 90),
                new User("xian-user", 10)
        );
        // 获取 table Api 运行环境
        StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(environment);
        // 将数据源转换成表 table_user
        Table tableUser = streamTableEnvironment.fromDataStream(streamSource);
        // sql 语句编写
        Table tableRelu = streamTableEnvironment.sqlQuery("select name from " + tableUser);
        Table table = streamTableEnvironment.sqlQuery("select name,sum(age) from "+tableUser+" where  name='song-user' group by name ");
        // 将表转换成流 输出 新增输出
        DataStream<Row> rowDataStream = streamTableEnvironment.toDataStream(tableRelu);
        // 有修改的输出
        streamTableEnvironment.toChangelogStream(table).print("sfsfsdd");
        // 打印流式数据
        rowDataStream.print();

        environment.execute();
    }
}
